/*
 * node_metadata.c
 *	  Functions that operate on pg_dist_node
 *
 * Copyright (c) Citus Data, Inc.
 */
#include "postgres.h"

#include "funcapi.h"
#include "miscadmin.h"

#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup.h"
#include "access/skey.h"
#include "access/tupmacs.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "distributed/commands/citus_sequence.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "postmaster/postmaster.h"
#include "storage/buf/bufmgr.h"
#include "storage/smgr/fd.h"
#include "storage/lmgr.h"
#include "storage/lock/lock.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/plancache.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/numeric.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_node_metadata.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/string_utils.h"
#include "distributed/session_ctx.h"
#include "distributed/transaction_recovery.h"
#include "distributed/version_compat.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"

#define INVALID_GROUP_ID -1

typedef struct NodeMetadata {
    int32 groupId;
    char* nodeRack;
    bool hasMetadata;
    bool metadataSynced;
    bool isActive;
    Oid nodeRole;
    bool shouldHaveShards;
    char* nodeCluster;
} NodeMetadata;

/* local function forward declarations */
static void RemoveNodeFromCluster(char* nodeName, int32 nodePort);
static void ErrorIfNodeContainsNonRemovablePlacements(WorkerNode* workerNode);
static bool PlacementHasActivePlacementOnAnotherGroup(
    GroupShardPlacement* sourcePlacement);
static int AddNodeMetadata(char* nodeName, int32 nodePort, NodeMetadata* nodeMetadata,
                           bool* nodeAlreadyExists, bool localOnly);
static int AddNodeMetadataViaMetadataContext(char* nodeName, int32 nodePort,
                                             NodeMetadata* nodeMetadata,
                                             bool* nodeAlreadyExists);
static HeapTuple GetNodeTuple(const char* nodeName, int32 nodePort);
static HeapTuple GetNodeByNodeId(int32 nodeId);
static int32 GetNextGroupId(void);
static int GetNextNodeId(void);
static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char* nodename, int32 nodeport,
                          NodeMetadata* nodeMetadata);
static void DeleteNodeRow(char* nodename, int32 nodeport);
static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode* TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static bool NodeIsLocal(WorkerNode* worker);
static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char* newNodeName, int32 newNodePort,
                               bool localOnly);
static bool UnsetMetadataSyncedForAllWorkers(void);
static char* GetMetadataSyncCommandToSetNodeColumn(WorkerNode* workerNode,
                                                   int columnIndex, Datum value);
static char* NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata);
static char* NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced);
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode* workerNode, Datum value,
                                               char* field);
static WorkerNode* SetShouldHaveShards(WorkerNode* workerNode, bool shouldHaveShards);
static WorkerNode* FindNodeAnyClusterByNodeId(uint32 nodeId);
static void ErrorIfAnyNodeNotExist(List* nodeList);
static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext* context);
static void SendDeletionCommandsForReplicatedTablePlacements(
    MetadataSyncContext* context);
static void SyncNodeMetadata(MetadataSyncContext* context);
static void SetNodeStateViaMetadataContext(MetadataSyncContext* context,
                                           WorkerNode* workerNode, Datum value);
static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext* context,
                                                   pid_t parentSessionPid);
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
static void SetNodeMetadata(MetadataSyncContext* context, bool localOnly);
static void EnsureTransactionalMetadataSyncMode(void);
static void LockShardsInWorkerPlacementList(WorkerNode* workerNode, LOCKMODE lockMode);
#ifdef DISABLE_OG_COMMENTS
static BackgroundWorkerHandle* CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown);
static BackgroundWorkerHandle* LockPlacementsWithBackgroundWorkersInPrimaryNode(
    WorkerNode* workerNode, bool force, int32 lock_cooldown);
#endif

/* Function definitions go here */

/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(spq_set_coordinator_host);
PG_FUNCTION_INFO_V1(spq_add_node);
PG_FUNCTION_INFO_V1(spq_add_inactive_node);
PG_FUNCTION_INFO_V1(spq_add_secondary_node);
PG_FUNCTION_INFO_V1(spq_set_node_property);
PG_FUNCTION_INFO_V1(spq_remove_node);
PG_FUNCTION_INFO_V1(spq_disable_node);
PG_FUNCTION_INFO_V1(spq_activate_node);
PG_FUNCTION_INFO_V1(spq_update_node);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
PG_FUNCTION_INFO_V1(spq_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);

extern "C" Datum citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS);
extern "C" Datum spq_is_coordinator(PG_FUNCTION_ARGS);
extern "C" Datum spq_update_node(PG_FUNCTION_ARGS);
extern "C" Datum spq_activate_node(PG_FUNCTION_ARGS);
extern "C" Datum spq_disable_node(PG_FUNCTION_ARGS);
extern "C" Datum spq_remove_node(PG_FUNCTION_ARGS);
extern "C" Datum spq_set_node_property(PG_FUNCTION_ARGS);
extern "C" Datum get_shard_id_for_distribution_column(PG_FUNCTION_ARGS);
extern "C" Datum spq_set_coordinator_host(PG_FUNCTION_ARGS);
extern "C" Datum spq_add_node(PG_FUNCTION_ARGS);
extern "C" Datum spq_add_inactive_node(PG_FUNCTION_ARGS);
extern "C" Datum spq_add_secondary_node(PG_FUNCTION_ARGS);

/*
 * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
 * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
 */
static NodeMetadata DefaultNodeMetadata()
{
    NodeMetadata nodeMetadata;

    /* ensure uninitialized padding doesn't escape the function */
    memset_struct_0(nodeMetadata);
    nodeMetadata.nodeRack = WORKER_DEFAULT_RACK;
    nodeMetadata.shouldHaveShards = true;
    nodeMetadata.groupId = INVALID_GROUP_ID;

    return nodeMetadata;
}

/*
 * spq_set_coordinator_host configures the hostname and port through which worker
 * nodes can connect to the coordinator.
 */
Datum spq_set_coordinator_host(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    text* nodeName = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);
    char* nodeNameString = text_to_cstring(nodeName);

    NodeMetadata nodeMetadata = DefaultNodeMetadata();
    nodeMetadata.groupId = 0;
    nodeMetadata.shouldHaveShards = false;
    nodeMetadata.nodeRole = PG_GETARG_OID(2);

    Name nodeClusterName = PG_GETARG_NAME(3);
    nodeMetadata.nodeCluster = NameStr(*nodeClusterName);

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (nodeMetadata.nodeRole == SecondaryNodeRoleId()) {
        EnsureTransactionalMetadataSyncMode();
    }

    /* prevent concurrent modification */
    LockRelationOid(DistNodeRelationId(), RowExclusiveLock);

    bool isCoordinatorInMetadata = false;
    WorkerNode* coordinatorNode =
        PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &isCoordinatorInMetadata);
    if (!isCoordinatorInMetadata) {
        bool nodeAlreadyExists = false;
        bool localOnly = false;

        /* add the coordinator to pg_dist_node if it was not already added */
        AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists,
                        localOnly);

        /* we just checked */
        Assert(!nodeAlreadyExists);
    } else {
        /*
         * since AddNodeMetadata takes an exclusive lock on pg_dist_node, we
         * do not need to worry about concurrent changes (e.g. deletion) and
         * can proceed to update immediately.
         */
        bool localOnly = false;
        UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort, localOnly);

        /* clear cached plans that have the old host/port */
        ResetPlanCache();
    }

    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_VOID();
}

/*
 * EnsureTransactionalMetadataSyncMode ensures metadata sync mode is transactional.
 */
static void EnsureTransactionalMetadataSyncMode(void)
{
    if (Session_ctx::Vars().MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) {
        ereport(ERROR, (errmsg("this operation cannot be completed in nontransactional "
                               "metadata sync mode"),
                        errhint("SET spq.metadata_sync_mode to 'transactional'")));
    }
}

/*
 * spq_add_node function adds a new node to the cluster and returns its id. It also
 * replicates all reference tables to the new node.
 */
Datum spq_add_node(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    EnsureSuperUser();
    EnsureCoordinator();

    text* nodeName = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);
    char* nodeNameString = text_to_cstring(nodeName);

    NodeMetadata nodeMetadata = DefaultNodeMetadata();
    bool nodeAlreadyExists = false;
    nodeMetadata.groupId = PG_GETARG_INT32(2);

    /*
     * During tests this function is called before nodeRole and nodeCluster have been
     * created.
     */
    if (PG_NARGS() == 3) {
        nodeMetadata.nodeRole = InvalidOid;
        nodeMetadata.nodeCluster = "default";
    } else {
        Name nodeClusterName = PG_GETARG_NAME(4);
        nodeMetadata.nodeCluster = NameStr(*nodeClusterName);

        nodeMetadata.nodeRole = PG_GETARG_OID(3);
    }

    if (nodeMetadata.groupId == COORDINATOR_GROUP_ID) {
        /* by default, we add the coordinator without shards */
        nodeMetadata.shouldHaveShards = false;
    }

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (nodeMetadata.nodeRole == SecondaryNodeRoleId()) {
        EnsureTransactionalMetadataSyncMode();
    }

    if (Session_ctx::Vars().MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL &&
        IsMultiStatementTransaction()) {
        /*
         * prevent inside transaction block as we use bare connections which can
         * lead deadlock
         */
        ereport(ERROR, (errmsg("do not add node in transaction block "
                               "when the sync mode is nontransactional"),
                        errhint("add the node after SET spq.metadata_sync_mode "
                                "TO 'transactional'")));
    }

    int nodeId = AddNodeMetadataViaMetadataContext(nodeNameString, nodePort,
                                                   &nodeMetadata, &nodeAlreadyExists);
    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_INT32(nodeId);
}

/*
 * spq_add_inactive_node function adds a new node to the cluster as inactive node
 * and returns id of the newly added node. It does not replicate reference
 * tables to the new node, it only adds new node to the pg_dist_node table.
 */
Datum spq_add_inactive_node(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    text* nodeName = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);
    char* nodeNameString = text_to_cstring(nodeName);
    Name nodeClusterName = PG_GETARG_NAME(4);

    NodeMetadata nodeMetadata = DefaultNodeMetadata();
    bool nodeAlreadyExists = false;
    nodeMetadata.groupId = PG_GETARG_INT32(2);
    nodeMetadata.nodeRole = PG_GETARG_OID(3);
    nodeMetadata.nodeCluster = NameStr(*nodeClusterName);

    if (nodeMetadata.groupId == COORDINATOR_GROUP_ID) {
        ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node")));
    }

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (nodeMetadata.nodeRole == SecondaryNodeRoleId()) {
        EnsureTransactionalMetadataSyncMode();
    }

    bool localOnly = false;
    int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
                                 &nodeAlreadyExists, localOnly);
    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_INT32(nodeId);
}

/*
 * spq_add_secondary_node adds a new secondary node to the cluster. It accepts as
 * arguments the primary node it should share a group with.
 */
Datum spq_add_secondary_node(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    text* nodeName = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);
    char* nodeNameString = text_to_cstring(nodeName);

    text* primaryName = PG_GETARG_TEXT_P(2);
    int32 primaryPort = PG_GETARG_INT32(3);
    char* primaryNameString = text_to_cstring(primaryName);

    Name nodeClusterName = PG_GETARG_NAME(4);
    NodeMetadata nodeMetadata = DefaultNodeMetadata();
    bool nodeAlreadyExists = false;

    nodeMetadata.groupId = GroupForNode(primaryNameString, primaryPort);
    nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
    nodeMetadata.nodeRole = SecondaryNodeRoleId();
    nodeMetadata.isActive = true;

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    EnsureTransactionalMetadataSyncMode();

    bool localOnly = false;
    int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
                                 &nodeAlreadyExists, localOnly);
    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_INT32(nodeId);
}

/*
 * spq_remove_node function removes the provided node from the pg_dist_node table of
 * the master node and all nodes with metadata.
 * The call to the spq_remove_node should be done by the super user and the specified
 * node should not have any active placements.
 * This function also deletes all reference table placements belong to the given node from
 * pg_dist_placement, but it does not drop actual placement at the node. In the case of
 * re-adding the node, spq_add_node first drops and re-creates the reference tables.
 */
Datum spq_remove_node(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    text* nodeNameText = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);

    RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort);
    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_VOID();
}

/*
 * spq_disable_node function sets isactive value of the provided node as inactive at
 * coordinator and all nodes with metadata regardless of the node having an active shard
 * placement.
 *
 * The call to the spq_disable_node must be done by the super user.
 *
 * This function also deletes all reference table placements belong to the given node
 * from pg_dist_placement, but it does not drop actual placement at the node. In the case
 * of re-activating the node, spq_add_node first drops and re-creates the reference
 * tables.
 */
Datum spq_disable_node(PG_FUNCTION_ARGS)
{
    text* nodeNameText = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);

    bool synchronousDisableNode = 1;
    Assert(PG_NARGS() == 2 || PG_NARGS() == 3);
    if (PG_NARGS() == 3) {
        synchronousDisableNode = PG_GETARG_BOOL(2);
    }

    char* nodeName = text_to_cstring(nodeNameText);
    WorkerNode* workerNode = ModifiableWorkerNode(nodeName, nodePort);

    /* there is no concept of invalid coordinator */
    bool isActive = false;
    ErrorIfCoordinatorMetadataSetFalse(workerNode, BoolGetDatum(isActive), "isactive");

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (NodeIsSecondary(workerNode)) {
        EnsureTransactionalMetadataSyncMode();
    }

    WorkerNode* firstWorkerNode = GetFirstPrimaryWorkerNode();
    bool disablingFirstNode =
        (firstWorkerNode && firstWorkerNode->nodeId == workerNode->nodeId);

    if (disablingFirstNode && !synchronousDisableNode) {
        /*
         * We sync metadata async and optionally in the background worker,
         * it would mean that some nodes might get the updates while other
         * not. And, if the node metadata that is changing is the first
         * worker node, the problem gets nasty. We serialize modifications
         * to replicated tables by acquiring locks on the first worker node.
         *
         * If some nodes get the metadata changes and some do not, they'd be
         * acquiring the locks on different nodes. Hence, having the
         * possibility of diverged shard placements for the same shard.
         *
         * To prevent that, we currently do not allow disabling the first
         * worker node unless it is explicitly opted synchronous.
         */
        ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                        errmsg("disabling the first worker node in the "
                               "metadata is not allowed"),
                        errhint("You can force disabling node, SELECT "
                                "spq_disable_node('%s', %d, "
                                "synchronous:=true);",
                                workerNode->workerName, nodePort),
                        errdetail("Citus uses the first worker node in the "
                                  "metadata for certain internal operations when "
                                  "replicated tables are modified. Synchronous mode "
                                  "ensures that all nodes have the same view of the "
                                  "first worker node, which is used for certain "
                                  "locking operations.")));
    }

    /*
     * First, locally mark the node as inactive. We'll later trigger background
     * worker to sync the metadata changes to the relevant nodes.
     */
    workerNode = SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
                                          BoolGetDatum(isActive));
    if (NodeIsPrimary(workerNode)) {
        /*
         * We do not allow disabling nodes if it contains any
         * primary placement that is the "only" active placement
         * for any given shard.
         */
        ErrorIfNodeContainsNonRemovablePlacements(workerNode);
    }

    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    if (synchronousDisableNode) {
        /*
         * The user might pick between sync vs async options.
         *      - Pros for the sync option:
         *          (a) the changes become visible on the cluster immediately
         *          (b) even if the first worker node is disabled, there is no
         *              risk of divergence of the placements of replicated shards
         *      - Cons for the sync options:
         *          (a) Does not work within 2PC transaction (e.g., BEGIN;
         *              spq_disable_node(); PREPARE TRANSACTION ...);
         *          (b) If there are multiple node failures (e.g., one another node
         *              than the current node being disabled), the sync option would
         *              fail because it'd try to sync the metadata changes to a node
         *              that is not up and running.
         */
        if (firstWorkerNode && firstWorkerNode->nodeId == workerNode->nodeId) {
            /*
             * We cannot let any modification query on a replicated table to run
             * concurrently with spq_disable_node() on the first worker node. If
             * we let that, some worker nodes might calculate FirstWorkerNode()
             * different than others. See LockShardListResourcesOnFirstWorker()
             * for the details.
             */
            BlockDistributedQueriesOnMetadataNodes();
        }

        SyncNodeMetadataToNodes();
    } else if (UnsetMetadataSyncedForAllWorkers()) {
        /*
         * We have not propagated the node metadata changes yet, make sure that all the
         * active nodes get the metadata updates. We defer this operation to the
         * background worker to make it possible disabling nodes when multiple nodes
         * are down.
         *
         * Note that the active placements reside on the active nodes. Hence, when
         * Citus finds active placements, it filters out the placements that are on
         * the disabled nodes. That's why, we don't have to change/sync placement
         * metadata at this point. Instead, we defer that to spq_activate_node()
         * where we expect all nodes up and running.
         */

        TriggerNodeMetadataSyncOnCommit();
    }

    PG_RETURN_VOID();
}

/*
 * BlockDistributedQueriesOnMetadataNodes blocks all the modification queries on
 * all nodes. Hence, should be used with caution.
 */
static void BlockDistributedQueriesOnMetadataNodes(void)
{
    /* first, block on the coordinator */
    LockRelationOid(DistNodeRelationId(), ExclusiveLock);

    /*
     * Note that we might re-design this lock to be more granular than
     * pg_dist_node, scoping only for modifications on the replicated
     * tables. However, we currently do not have any such mechanism and
     * given that spq_disable_node() runs instantly, it seems acceptable
     * to block reads (or modifications on non-replicated tables) for
     * a while.
     */

    /* only superuser can disable node */
    Assert(superuser());

    SendCommandToWorkersWithMetadata(
        "LOCK TABLE pg_catalog.pg_dist_node IN EXCLUSIVE MODE;");
}

/*
 * spq_set_node_property sets a property of the node
 */
Datum spq_set_node_property(PG_FUNCTION_ARGS)
{
    text* nodeNameText = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);
    text* propertyText = PG_GETARG_TEXT_P(2);
    bool value = PG_GETARG_BOOL(3);

    WorkerNode* workerNode =
        ModifiableWorkerNode(text_to_cstring(nodeNameText), nodePort);

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (NodeIsSecondary(workerNode)) {
        EnsureTransactionalMetadataSyncMode();
    }

    if (strcmp(text_to_cstring(propertyText), "shouldhaveshards") == 0) {
        SetShouldHaveShards(workerNode, value);
    } else {
        ereport(
            ERROR,
            (errmsg(
                "only the 'shouldhaveshards' property can be set using this function")));
    }

    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_VOID();
}

/*
 * ModifiableWorkerNode gets the requested WorkerNode and also gets locks
 * required for modifying it. This fails if the node does not exist.
 */
WorkerNode* ModifiableWorkerNode(const char* nodeName, int32 nodePort)
{
    CheckCitusVersion(ERROR);
    EnsureCoordinator();

    /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
    LockRelationOid(DistNodeRelationId(), ExclusiveLock);

    WorkerNode* workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
    if (workerNode == NULL) {
        ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
    }

    return workerNode;
}

/*
 * spq_activate_node UDF activates the given node. It sets the node's isactive
 * value to active and replicates all reference tables to that node.
 */
Datum spq_activate_node(PG_FUNCTION_ARGS)
{
    text* nodeNameText = PG_GETARG_TEXT_P(0);
    int32 nodePort = PG_GETARG_INT32(1);

    char* nodeNameString = text_to_cstring(nodeNameText);
    WorkerNode* workerNode = ModifiableWorkerNode(nodeNameString, nodePort);

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (NodeIsSecondary(workerNode)) {
        EnsureTransactionalMetadataSyncMode();
    }

    /*
     * Create MetadataSyncContext which is used throughout nodes' activation.
     * It contains activated nodes, bare connections if the mode is nontransactional,
     * and a memory context for allocation.
     */
    bool collectCommands = false;
    bool nodesAddedInSameTransaction = false;
    MetadataSyncContext* context = CreateMetadataSyncContext(
        list_make1(workerNode), collectCommands, nodesAddedInSameTransaction);

    ActivateNodeList(context);

    /* if metadata sync is disabled, we only need to change local pg_dist_node. */
    if (!Session_ctx::Vars().EnableMetadataSync) {
        auto node =
            SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, DatumGetBool(true));
        if (!node->isActive) {
            ereport(ERROR, (errmsg("failed to activate node: \"%s:%d\"",
                                   workerNode->workerName, workerNode->workerPort)));
        }
    }

    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_INT32(workerNode->nodeId);
}

/*
 * GroupForNode returns the group which a given node belongs to.
 *
 * It only works if the requested node is a part of CurrentCluster.
 */
uint32 GroupForNode(char* nodeName, int nodePort)
{
    WorkerNode* workerNode = FindWorkerNode(nodeName, nodePort);

    if (workerNode == NULL) {
        ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
    }

    return workerNode->groupId;
}

/*
 * NodeIsPrimaryAndRemote returns whether the argument represents the remote
 * primary node.
 */
bool NodeIsPrimaryAndRemote(WorkerNode* worker)
{
    return NodeIsPrimary(worker) && !NodeIsLocal(worker);
}

/*
 * NodeIsPrimary returns whether the argument represents a primary node.
 */
bool NodeIsPrimary(WorkerNode* worker)
{
    Oid primaryRole = PrimaryNodeRoleId();

    /* if nodeRole does not yet exist, all nodes are primary nodes */
    if (primaryRole == InvalidOid) {
        return true;
    }

    return worker->nodeRole == primaryRole;
}

/*
 * NodeIsLocal returns whether the argument represents the local node.
 */
static bool NodeIsLocal(WorkerNode* worker)
{
    return worker->groupId == GetLocalGroupId();
}

/*
 * NodeIsSecondary returns whether the argument represents a secondary node.
 */
bool NodeIsSecondary(WorkerNode* worker)
{
    Oid secondaryRole = SecondaryNodeRoleId();

    /* if nodeRole does not yet exist, all nodes are primary nodes */
    if (secondaryRole == InvalidOid) {
        return false;
    }

    return worker->nodeRole == secondaryRole;
}

/*
 * NodeIsReadable returns whether we're allowed to send SELECT queries to this
 * node.
 */
bool NodeIsReadable(WorkerNode* workerNode)
{
    if (Session_ctx::Vars().ReadFromSecondaries == USE_SECONDARY_NODES_NEVER &&
        NodeIsPrimary(workerNode)) {
        return true;
    }

    if (Session_ctx::Vars().ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS &&
        NodeIsSecondary(workerNode)) {
        return true;
    }

    return false;
}

/*
 * PrimaryNodeForGroup returns the (unique) primary in the specified group.
 *
 * If there are any nodes in the requested group and groupContainsNodes is not NULL
 * it will set the bool groupContainsNodes references to true.
 */
WorkerNode* PrimaryNodeForGroup(int32 groupId, bool* groupContainsNodes)
{
    WorkerNode* workerNode = NULL;
    HASH_SEQ_STATUS status;
    HTAB* workerNodeHash = GetWorkerNodeHash();

    hash_seq_init(&status, workerNodeHash);

    while ((workerNode = static_cast<WorkerNode*>(hash_seq_search(&status))) != NULL) {
        int32 workerNodeGroupId = workerNode->groupId;
        if (workerNodeGroupId != groupId) {
            continue;
        }

        if (groupContainsNodes != NULL) {
            *groupContainsNodes = true;
        }

        if (NodeIsPrimary(workerNode)) {
            hash_seq_term(&status);
            return workerNode;
        }
    }

    return NULL;
}

/*
 * MarkNodesNotSyncedInLoopBackConnection unsets metadatasynced flag in separate
 * connection to localhost by calling the udf `citus_internal_mark_node_not_synced`.
 */
static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext* context,
                                                   pid_t parentSessionPid)
{
    Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL);
    Assert(!MetadataSyncCollectsCommands(context));

    /*
     * Set metadatasynced to false for all activated nodes to mark the nodes as not synced
     * in case nontransactional metadata sync fails before we activate the nodes inside
     * metadataSyncContext.
     * We set metadatasynced to false at coordinator to mark the nodes as not synced. But
     * we do not set isactive and hasmetadata flags to false as we still want to route
     * queries to the nodes if their isactive flag is true and propagate DDL to the nodes
     * if possible.
     *
     * NOTES:
     * 1) We use separate connection to localhost as we would rollback the local
     * transaction in case of failure.
     * 2) Operator should handle problems at workers if any. Wworkers probably fail
     * due to improper metadata when a query hits. Or DDL might fail due to desynced
     * nodes. (when hasmetadata = true, metadatasynced = false)
     * In those cases, proper metadata sync for the workers should be done.)
     */

    /*
     * Because we try to unset metadatasynced flag with a separate transaction,
     * we could not find the new node if the node is added in the current local
     * transaction. But, hopefully, we do not need to unset metadatasynced for
     * the new node as local transaction would rollback in case of a failure.
     */
    if (context->nodesAddedInSameTransaction) {
        return;
    }

    if (context->activatedWorkerNodeList == NIL) {
        return;
    }

    int connectionFlag = FORCE_NEW_CONNECTION;
    MultiConnection* connection =
        GetNodeConnection(connectionFlag, Session_ctx::Vars().LocalHostName,
                          g_instance.attr.attr_network.PostPortNumber);

    List* commandList = NIL;
    WorkerNode* workerNode = NULL;
    foreach_declared_ptr(workerNode, context->activatedWorkerNodeList)
    {
        /*
         * We need to prevent self deadlock when we access pg_dist_node using separate
         * connection to localhost. To achieve this, we check if the caller session's
         * pid holds the Exclusive lock on pg_dist_node. After ensuring that (we are
         * called from parent session which holds the Exclusive lock), we can safely
         * update node metadata by acquiring the relaxed lock.
         */
        StringInfo metadatasyncCommand = makeStringInfo();
        appendStringInfo(metadatasyncCommand, CITUS_INTERNAL_MARK_NODE_NOT_SYNCED,
                         parentSessionPid, workerNode->nodeId);
        commandList = lappend(commandList, metadatasyncCommand->data);
    }

    SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList);
    CloseConnection(connection);
}

/*
 * SetNodeMetadata sets isactive, metadatasynced and hasmetadata flags locally
 * and, if required, remotely.
 */
static void SetNodeMetadata(MetadataSyncContext* context, bool localOnly)
{
    /* do not execute local transaction if we collect commands */
    if (!MetadataSyncCollectsCommands(context)) {
        List* updatedActivatedNodeList = NIL;

        WorkerNode* node = NULL;
        foreach_declared_ptr(node, context->activatedWorkerNodeList)
        {
            node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive,
                                            BoolGetDatum(true));
            node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_metadatasynced,
                                            BoolGetDatum(true));
            node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_hasmetadata,
                                            BoolGetDatum(true));

            updatedActivatedNodeList = lappend(updatedActivatedNodeList, node);
        }

        /* reset activated nodes inside metadataSyncContext afer local update */
        SetMetadataSyncNodesFromNodeList(context, updatedActivatedNodeList);
    }

    if (!localOnly && Session_ctx::Vars().EnableMetadataSync) {
        WorkerNode* node = NULL;
        foreach_declared_ptr(node, context->activatedWorkerNodeList)
        {
            SetNodeStateViaMetadataContext(context, node, BoolGetDatum(true));
        }
    }
}

/*
 * ActivateNodeList does some sanity checks and acquire Exclusive lock on pg_dist_node,
 * and then activates the nodes inside given metadataSyncContext.
 *
 * The function operates in 3 different modes according to transactionMode inside
 * metadataSyncContext.
 *
 * 1. MetadataSyncCollectsCommands(context):
 *      Only collect commands instead of sending them to workers,
 * 2. context.transactionMode == METADATA_SYNC_TRANSACTIONAL:
 *      Send all commands using coordinated transaction,
 * 3. context.transactionMode == METADATA_SYNC_NON_TRANSACTIONAL:
 *      Send all commands using bare (no transaction block) connections.
 */
void ActivateNodeList(MetadataSyncContext* context)
{
    if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL &&
        IsMultiStatementTransaction()) {
        /*
         * prevent inside transaction block as we use bare connections which can
         * lead deadlock
         */
        ereport(ERROR, (errmsg("do not sync metadata in transaction block "
                               "when the sync mode is nontransactional"),
                        errhint("resync after SET spq.metadata_sync_mode "
                                "TO 'transactional'")));
    }

    /*
     * We currently require the object propagation to happen via superuser,
     * see #5139. While activating a node, we sync both metadata and object
     * propagation.
     *
     * In order to have a fully transactional semantics with add/activate
     * node operations, we require superuser. Note that for creating
     * non-owned objects, we already require a superuser connection.
     * By ensuring the current user to be a superuser, we can guarantee
     * to send all commands within the same remote transaction.
     */
    EnsureSuperUser();

    /*
     * Take an exclusive lock on pg_dist_node to serialize pg_dist_node
     * changes.
     */
    LockRelationOid(DistNodeRelationId(), ExclusiveLock);

    /*
     * Error if there is concurrent change to node table before acquiring
     * the lock
     */
    ErrorIfAnyNodeNotExist(context->activatedWorkerNodeList);

    /*
     * we need to unset metadatasynced flag to false at coordinator in separate
     * transaction only at nontransactional sync mode and if we do not collect
     * commands.
     *
     * We make sure we set the flag to false at the start of nontransactional
     * metadata sync to mark those nodes are not synced in case of a failure in
     * the middle of the sync.
     */
    if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL &&
        !MetadataSyncCollectsCommands(context)) {
        MarkNodesNotSyncedInLoopBackConnection(context, t_thrd.proc_cxt.MyProcPid);
    }

    /*
     * Delete existing reference and replicated table placements on the
     * given groupId if the group has been disabled earlier (e.g., isActive
     * set to false).
     */
    SendDeletionCommandsForReplicatedTablePlacements(context);

    /*
     * SetNodeMetadata sets isactive, metadatasynced and hasmetadata flags
     * locally for following reasons:
     *
     * 1) Set isactive to true locally so that we can find activated nodes amongst
     *    active workers,
     * 2) Do not fail just because the current metadata is not synced. (see
     *    ErrorIfAnyMetadataNodeOutOfSync),
     * 3) To propagate activated nodes nodemetadata correctly.
     *
     * We are going to sync the metadata anyway in this transaction, set
     * isactive, metadatasynced, and hasmetadata to true locally.
     * The changes would rollback in case of failure.
     */
    bool localOnly = true;
    SetNodeMetadata(context, localOnly);

    /*
     * Update local group ids so that upcoming transactions can see its effect.
     * Object dependency logic requires to have updated local group id.
     */
    UpdateLocalGroupIdsViaMetadataContext(context);

    /*
     * Sync node metadata so that placement insertion does not fail due to
     * EnsureShardPlacementMetadataIsSane.
     */
    SyncNodeMetadata(context);

    /*
     * Sync all dependencies and distributed objects with their pg_dist_xx tables to
     * metadata nodes inside metadataSyncContext. Depends on node metadata.
     */
    SyncDistributedObjects(context);

    /*
     * Let all nodes to be active and synced after all operations succeeded.
     * we make sure that the metadata sync is idempotent and safe overall with multiple
     * other transactions, if nontransactional mode is used.
     *
     * We already took Exclusive lock on node metadata, which prevents modification
     * on node metadata on coordinator. The step will rollback, in case of a failure,
     * to the state where metadatasynced=false.
     */
    localOnly = false;
    SetNodeMetadata(context, localOnly);
}

/*
 * Acquires shard metadata locks on all shards residing in the given worker node
 *
 * TODO: This function is not compatible with query from any node feature.
 * To ensure proper behavior, it is essential to acquire locks on placements across all
 * nodes rather than limiting it to just the coordinator (or the specific node from which
 * this function is called)
 */
void LockShardsInWorkerPlacementList(WorkerNode* workerNode, LOCKMODE lockMode)
{
    List* placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
    LockShardsInPlacementListMetadata(placementList, lockMode);
}

#ifdef DISABLE_OG_COMMENTS
/*
 * This function is used to start a background worker to kill backends holding conflicting
 * locks with this backend. It returns NULL if the background worker could not be started.
 */
BackgroundWorkerHandle* CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown)
{
    BackgroundWorkerHandle* handle =
        StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
    if (!handle) {
        /*
         * We failed to start a background worker, which probably means that we exceeded
         * max_worker_processes, and this is unlikely to be resolved by retrying. We do
         * not want to repeatedly throw an error because if spq_update_node is called to
         * complete a failover then finishing is the only way to bring the cluster back
         * up. Therefore we give up on killing other backends and simply wait for the
         * lock. We do set lock_timeout to lock_cooldown, because we don't want to wait
         * forever to get a lock.
         */
        SetLockTimeoutLocally(lock_cooldown);
        ereport(
            WARNING,
            (errmsg("could not start background worker to kill backends with conflicting"
                    " locks to force the update. Degrading to acquiring locks "
                    "with a lock time out."),
             errhint("Increasing max_worker_processes might help.")));
    }
    return handle;
}

/*
 * This function is used to lock shards in a primary node.
 * If force is true, we start a background worker to kill backends holding
 * conflicting locks with this backend.
 *
 * If the node is a primary node we block reads and writes.
 *
 * This lock has two purposes:
 *
 * - Ensure buggy code in Citus doesn't cause failures when the
 *   nodename/nodeport of a node changes mid-query
 *
 * - Provide fencing during failover, after this function returns all
 *   connections will use the new node location.
 *
 * Drawback:
 *
 * - This function blocks until all previous queries have finished. This
 *   means that long-running queries will prevent failover.
 *
 *   In case of node failure said long-running queries will fail in the end
 *   anyway as they will be unable to commit successfully on the failed
 *   machine. To cause quick failure of these queries use force => true
 *   during the invocation of spq_update_node to terminate conflicting
 *   backends proactively.
 *
 * It might be worth blocking reads to a secondary for the same reasons,
 * though we currently only query secondaries on follower clusters
 * where these locks will have no effect.
 */
BackgroundWorkerHandle* LockPlacementsWithBackgroundWorkersInPrimaryNode(
    WorkerNode* workerNode, bool force, int32 lock_cooldown)
{
    BackgroundWorkerHandle* handle = NULL;

    if (NodeIsPrimary(workerNode)) {
        if (force) {
            handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown);
        }
        LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
    }
    return handle;
}
#endif

/*
 * spq_update_node moves the requested node to a different nodename and nodeport. It
 * locks to ensure no queries are running concurrently; and is intended for customers who
 * are running their own failover solution.
 */
Datum spq_update_node(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    int32 nodeId = PG_GETARG_INT32(0);

    text* newNodeName = PG_GETARG_TEXT_P(1);
    int32 newNodePort = PG_GETARG_INT32(2);

    /*
     * force is used when an update needs to happen regardless of conflicting locks. This
     * feature is important to force the update during a failover due to failure, eg. by
     * a high-availability system such as pg_auto_failover. The strategy is to start a
     * background worker that actively cancels backends holding conflicting locks with
     * this backend.
     *
     * Defaults to false
     */
    bool force = PG_GETARG_BOOL(3);
    int32 lock_cooldown = PG_GETARG_INT32(4);

    char* newNodeNameString = text_to_cstring(newNodeName);

    WorkerNode* workerNodeWithSameAddress =
        FindWorkerNodeAnyCluster(newNodeNameString, newNodePort);
    if (workerNodeWithSameAddress != NULL) {
        /* a node with the given hostname and port already exists in the metadata */

        if (workerNodeWithSameAddress->nodeId == nodeId) {
            /* it's the node itself, meaning this is a noop update */
            PG_RETURN_VOID();
        } else {
            ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                            errmsg("there is already another node with the specified "
                                   "hostname and port")));
        }
    }

    WorkerNode* workerNode = FindNodeAnyClusterByNodeId(nodeId);
    if (workerNode == NULL) {
        ereport(ERROR,
                (errcode(ERRCODE_NO_DATA_FOUND), errmsg("node %u not found", nodeId)));
    }

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (NodeIsSecondary(workerNode)) {
        EnsureTransactionalMetadataSyncMode();
    }

#ifdef DISABLE_OG_COMMENTS
    BackgroundWorkerHandle* handle = LockPlacementsWithBackgroundWorkersInPrimaryNode(
        workerNode, force, lock_cooldown);
#endif

    /*
     * if we have planned statements such as prepared statements, we should clear the
     * cache so that the planned cache doesn't return the old nodename/nodepost.
     */
    ResetPlanCache();

    bool localOnly = true;
    UpdateNodeLocation(nodeId, newNodeNameString, newNodePort, localOnly);

    /* we should be able to find the new node from the metadata */
    workerNode = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort);
    Assert(workerNode->nodeId == nodeId);

    /*
     * Propagate the updated pg_dist_node entry to all metadata workers.
     * citus-ha uses spq_update_node() in a prepared transaction, and
     * we don't support coordinated prepared transactions, so we cannot
     * propagate the changes to the worker nodes here. Instead we mark
     * all metadata nodes as not-synced and ask maintenanced to do the
     * propagation.
     *
     * It is possible that maintenance daemon does the first resync too
     * early, but that's fine, since this will start a retry loop with
     * 5 second intervals until sync is complete.
     */
    if (UnsetMetadataSyncedForAllWorkers()) {
        TriggerNodeMetadataSyncOnCommit();
    }

#ifdef DISABLE_OG_COMMENTS
    if (handle != NULL) {
        /*
         * this will be called on memory context cleanup as well, if the worker has been
         * terminated already this will be a noop
         */
        TerminateBackgroundWorker(handle);
    }
#endif

    Session_ctx::Trans().TransactionModifiedNodeMetadata = true;

    PG_RETURN_VOID();
}

/*
 * SetLockTimeoutLocally sets the lock_timeout to the given value.
 * This setting is local.
 */
static void SetLockTimeoutLocally(int32 lockCooldown)
{
    set_config_option("lockwait_timeout", ConvertIntToString(lockCooldown),
                      (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
                      GUC_ACTION_LOCAL, true, 0, false);
}

static void UpdateNodeLocation(int32 nodeId, char* newNodeName, int32 newNodePort,
                               bool localOnly)
{
    const bool indexOK = true;

    ScanKeyData scanKey[1];
    Datum values[Natts_pg_dist_node];
    bool isnull[Natts_pg_dist_node];
    bool replace[Natts_pg_dist_node];

    Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
    TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);

    ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid, BTEqualStrategyNumber, F_INT4EQ,
                Int32GetDatum(nodeId));

    SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, DistNodeNodeIdIndexId(),
                                                    indexOK, NULL, 1, scanKey);

    HeapTuple heapTuple = systable_getnext(scanDescriptor);
    if (!HeapTupleIsValid(heapTuple)) {
        ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
                               newNodeName, newNodePort)));
    }

    memset(replace, 0, sizeof(replace));

    values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
    isnull[Anum_pg_dist_node_nodeport - 1] = false;
    replace[Anum_pg_dist_node_nodeport - 1] = true;

    values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(newNodeName);
    isnull[Anum_pg_dist_node_nodename - 1] = false;
    replace[Anum_pg_dist_node_nodename - 1] = true;

    heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);

    CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);

    CitusInvalidateRelcacheByRelid(DistNodeRelationId());

    CommandCounterIncrement();

    if (!localOnly && Session_ctx::Vars().EnableMetadataSync) {
        WorkerNode* updatedNode = FindWorkerNodeAnyCluster(newNodeName, newNodePort);
        Assert(updatedNode->nodeId == nodeId);

        /* send the delete command to all primary nodes with metadata */
        char* nodeDeleteCommand = NodeDeleteCommand(updatedNode->nodeId);
        SendCommandToWorkersWithMetadata(nodeDeleteCommand);

        /* send the insert command to all primary nodes with metadata */
        char* nodeInsertCommand = NodeListInsertCommand(list_make1(updatedNode));
        SendCommandToWorkersWithMetadata(nodeInsertCommand);
    }

    systable_endscan(scanDescriptor);
    table_close(pgDistNode, NoLock);
}

/*
 * get_shard_id_for_distribution_column function takes a distributed table name and a
 * distribution value then returns shard id of the shard which belongs to given table and
 * contains given value. This function only works for hash distributed tables.
 */
Datum get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    ShardInterval* shardInterval = NULL;

    /*
     * To have optional parameter as NULL, we defined this UDF as not strict, therefore
     * we need to check all parameters for NULL values.
     */
    if (PG_ARGISNULL(0)) {
        ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
                        errmsg("relation cannot be NULL")));
    }

    Oid relationId = PG_GETARG_OID(0);
    EnsureTablePermissions(relationId, ACL_SELECT);

    if (!IsCitusTable(relationId)) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
                        errmsg("relation is not distributed")));
    }

    if (!HasDistributionKey(relationId)) {
        List* shardIntervalList = LoadShardIntervalList(relationId);
        if (shardIntervalList == NIL) {
            PG_RETURN_INT64(0);
        }

        shardInterval = (ShardInterval*)linitial(shardIntervalList);
    } else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) ||
               IsCitusTableType(relationId, RANGE_DISTRIBUTED)) {
        CitusTableCacheEntry* cacheEntry = GetCitusTableCacheEntry(relationId);

        /* if given table is not reference table, distributionValue cannot be NULL */
        if (PG_ARGISNULL(1)) {
            ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
                            errmsg("distribution value cannot be NULL for tables other "
                                   "than reference tables.")));
        }

        Datum inputDatum = PG_GETARG_DATUM(1);
        Oid inputDataType = get_fn_expr_argtype(fcinfo->flinfo, 1);
        char* distributionValueString = DatumToString(inputDatum, inputDataType);

        Var* distributionColumn = DistPartitionKeyOrError(relationId);
        Oid distributionDataType = distributionColumn->vartype;

        Datum distributionValueDatum =
            StringToDatum(distributionValueString, distributionDataType);

        shardInterval = FindShardInterval(distributionValueDatum, cacheEntry);
    } else {
        ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                        errmsg("finding shard id of given distribution value is only "
                               "supported for hash partitioned tables, range partitioned "
                               "tables and reference tables.")));
    }

    if (shardInterval != NULL) {
        PG_RETURN_INT64(shardInterval->shardId);
    }

    PG_RETURN_INT64(0);
}

/*
 * spq_is_coordinator returns whether the current node is a coordinator.
 * We consider the node a coordinator if its group ID is 0 and it has
 * pg_dist_node entries (only group ID 0 could indicate a worker without
 * metadata).
 */
Datum spq_is_coordinator(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    bool isCoordinator = false;

    if (GetLocalGroupId() == COORDINATOR_GROUP_ID && ActiveReadableNodeCount() > 0) {
        isCoordinator = true;
    }

    PG_RETURN_BOOL(isCoordinator);
}

/*
 * EnsureParentSessionHasExclusiveLockOnPgDistNode ensures given session id
 * holds Exclusive lock on pg_dist_node.
 */
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid)
{
    StringInfo checkIfParentLockCommandStr = makeStringInfo();

    int spiConnectionResult = SPI_connect();
    if (spiConnectionResult != SPI_OK_CONNECT) {
        ereport(ERROR, (errmsg("could not connect to SPI manager")));
    }

    char* checkIfParentLockCommand = "SELECT pid FROM pg_locks WHERE "
                                     "pid = %d AND database = %d AND relation = %d AND "
                                     "mode = 'ExclusiveLock' AND granted = TRUE";
    appendStringInfo(checkIfParentLockCommandStr, checkIfParentLockCommand,
                     parentSessionPid, MyDatabaseId, DistNodeRelationId());

    bool readOnly = true;
    int spiQueryResult = SPI_execute(checkIfParentLockCommandStr->data, readOnly, 0);
    if (spiQueryResult != SPI_OK_SELECT) {
        ereport(ERROR, (errmsg("execution was not successful \"%s\"",
                               checkIfParentLockCommandStr->data)));
    }

    bool parentHasExclusiveLock = SPI_processed > 0;

    SPI_finish();

    if (!parentHasExclusiveLock) {
        ereport(ERROR, (errmsg("lock is not held by the caller. Unexpected caller "
                               "for citus_internal_mark_node_not_synced")));
    }
}

/*
 * citus_internal_mark_node_not_synced unsets metadatasynced flag in separate connection
 * to localhost. Should only be called by `MarkNodesNotSyncedInLoopBackConnection`.
 * See it for details.
 */
Datum citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
{
    CheckCitusVersion(ERROR);

    /* only called by superuser */
    EnsureSuperUser();

    pid_t parentSessionPid = PG_GETARG_INT32(0);

    /* fetch node by id */
    int nodeId = PG_GETARG_INT32(1);
    HeapTuple heapTuple = GetNodeByNodeId(nodeId);

    /* ensure that parent session holds Exclusive lock to pg_dist_node */
    EnsureParentSessionHasExclusiveLockOnPgDistNode(parentSessionPid);

    /*
     * We made sure parent session holds the ExclusiveLock, so we can unset
     * metadatasynced for the node safely with the relaxed lock here.
     */
    Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
    TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);

    Datum values[Natts_pg_dist_node];
    bool isnull[Natts_pg_dist_node];
    bool replace[Natts_pg_dist_node];

    memset(replace, 0, sizeof(replace));
    values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false);
    isnull[Anum_pg_dist_node_metadatasynced - 1] = false;
    replace[Anum_pg_dist_node_metadatasynced - 1] = true;

    heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);

    CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);

    CitusInvalidateRelcacheByRelid(DistNodeRelationId());
    CommandCounterIncrement();

    table_close(pgDistNode, NoLock);

    PG_RETURN_VOID();
}

/*
 * FindWorkerNode searches over the worker nodes and returns the workerNode
 * if it already exists. Else, the function returns NULL.
 */
WorkerNode* FindWorkerNode(const char* nodeName, int32 nodePort)
{
    HTAB* workerNodeHash = GetWorkerNodeHash();
    bool handleFound = false;

    WorkerNode searchedNode;
    strlcpy(searchedNode.workerName, nodeName, WORKER_LENGTH);
    searchedNode.workerPort = nodePort;

    WorkerNode* cachedWorkerNode = (WorkerNode*)hash_search(
        workerNodeHash, (void*)(&searchedNode), HASH_FIND, &handleFound);
    if (handleFound) {
        WorkerNode* workerNode = (WorkerNode*)palloc(sizeof(WorkerNode));
        *workerNode = *cachedWorkerNode;
        return workerNode;
    }

    return NULL;
}

/*
 * FindWorkerNode searches over the worker nodes and returns the workerNode
 * if it exists otherwise it errors out.
 */
WorkerNode* FindWorkerNodeOrError(const char* nodeName, int32 nodePort)
{
    WorkerNode* node = FindWorkerNode(nodeName, nodePort);
    if (node == NULL) {
        ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
                        errmsg("node %s:%d not found", nodeName, nodePort)));
    }
    return node;
}

/*
 * FindWorkerNodeAnyCluster returns the workerNode no matter which cluster it is a part
 * of. FindWorkerNodes, like almost every other function, acts as if nodes in other
 * clusters do not exist.
 */
WorkerNode* FindWorkerNodeAnyCluster(const char* nodeName, int32 nodePort)
{
    WorkerNode* workerNode = NULL;

    Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
    TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);

    HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
    if (heapTuple != NULL) {
        workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);
    }

    table_close(pgDistNode, NoLock);
    return workerNode;
}

/*
 * FindNodeAnyClusterByNodeId searches pg_dist_node and returns the node with
 * the nodeId. If the node can't be found returns NULL.
 */
static WorkerNode* FindNodeAnyClusterByNodeId(uint32 nodeId)
{
    bool includeNodesFromOtherClusters = true;
    List* nodeList = ReadDistNode(includeNodesFromOtherClusters);
    WorkerNode* node = NULL;

    foreach_declared_ptr(node, nodeList)
    {
        if (node->nodeId == nodeId) {
            return node;
        }
    }

    return NULL;
}

/*
 * FindNodeWithNodeId searches pg_dist_node and returns the node with the nodeId.
 * If the node cannot be found this functions errors.
 */
WorkerNode* FindNodeWithNodeId(int nodeId, bool missingOk)
{
    List* nodeList = ActiveReadableNodeList();
    WorkerNode* node = NULL;

    foreach_declared_ptr(node, nodeList)
    {
        if (node->nodeId == nodeId) {
            return node;
        }
    }

    /* there isn't any node with nodeId in pg_dist_node */
    if (!missingOk) {
        elog(ERROR, "node with node id %d could not be found", nodeId);
    }

    return NULL;
}

/*
 * FindCoordinatorNodeId returns the node id of the coordinator node
 */
int FindCoordinatorNodeId()
{
    bool includeNodesFromOtherClusters = false;
    List* nodeList = ReadDistNode(includeNodesFromOtherClusters);
    WorkerNode* node = NULL;

    foreach_declared_ptr(node, nodeList)
    {
        if (NodeIsCoordinator(node)) {
            return node->nodeId;
        }
    }

    return -1;
}

/*
 * ReadDistNode iterates over pg_dist_node table, converts each row
 * into its memory representation (i.e., WorkerNode) and adds them into
 * a list. Lastly, the list is returned to the caller.
 *
 * It skips nodes which are not in the current clusters unless requested to do otherwise
 * by includeNodesFromOtherClusters.
 */
List* ReadDistNode(bool includeNodesFromOtherClusters)
{
    ScanKeyData scanKey[1];
    int scanKeyCount = 0;
    List* workerNodeList = NIL;

    Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);

    SysScanDesc scanDescriptor =
        systable_beginscan(pgDistNode, InvalidOid, false, NULL, scanKeyCount, scanKey);

    TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);

    HeapTuple heapTuple = systable_getnext(scanDescriptor);
    while (HeapTupleIsValid(heapTuple)) {
        WorkerNode* workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);

        if (includeNodesFromOtherClusters ||
            strncmp(workerNode->nodeCluster, Session_ctx::Vars().CurrentCluster,
                    WORKER_LENGTH) == 0) {
            /* the coordinator acts as if it never sees nodes not in its cluster */
            workerNodeList = lappend(workerNodeList, workerNode);
        }

        heapTuple = systable_getnext(scanDescriptor);
    }

    systable_endscan(scanDescriptor);
    table_close(pgDistNode, NoLock);

    return workerNodeList;
}

/*
 * RemoveNodeFromCluster removes the provided node from the pg_dist_node table of
 * the master node and all nodes with metadata.
 * The call to the spq_remove_node should be done by the super user. If there are
 * active shard placements on the node; the function errors out.
 * This function also deletes all reference table placements belong to the given node from
 * pg_dist_placement, but it does not drop actual placement at the node. It also
 * modifies replication factor of the colocation group of reference tables, so that
 * replication factor will be equal to worker count.
 */
static void RemoveNodeFromCluster(char* nodeName, int32 nodePort)
{
    WorkerNode* workerNode = ModifiableWorkerNode(nodeName, nodePort);

    /*
     * We do not allow metadata operations on secondary nodes in nontransactional
     * sync mode.
     */
    if (NodeIsSecondary(workerNode)) {
        EnsureTransactionalMetadataSyncMode();
    }

    if (NodeIsPrimary(workerNode)) {
        ErrorIfNodeContainsNonRemovablePlacements(workerNode);

        /*
         * Delete reference table placements so they are not taken into account
         * for the check if there are placements after this.
         */
        bool localOnly = false;
        DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, localOnly);

        /*
         * Secondary nodes are read-only, never 2PC is used.
         * Hence, no items can be inserted to pg_dist_transaction
         * for secondary nodes.
         */
        DeleteWorkerTransactions(workerNode);
    }

    DeleteNodeRow(workerNode->workerName, nodePort);

    /* make sure we don't have any lingering session lifespan connections */
    CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);

    if (Session_ctx::Vars().EnableMetadataSync) {
        char* nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);

        SendCommandToWorkersWithMetadata(nodeDeleteCommand);
    }
}

/*
 * ErrorIfNodeContainsNonRemovablePlacements throws an error if the input node
 * contains at least one placement on the node that is the last active
 * placement.
 */
static void ErrorIfNodeContainsNonRemovablePlacements(WorkerNode* workerNode)
{
    int32 groupId = workerNode->groupId;
    List* shardPlacements = AllShardPlacementsOnNodeGroup(groupId);

    /* sort the list to prevent regression tests getting flaky */
    shardPlacements = SortList(shardPlacements, CompareGroupShardPlacements);

    GroupShardPlacement* placement = NULL;
    foreach_declared_ptr(placement, shardPlacements)
    {
        if (!PlacementHasActivePlacementOnAnotherGroup(placement)) {
            Oid relationId = RelationIdForShard(placement->shardId);
            char* qualifiedRelationName = generate_qualified_relation_name(relationId);

            ereport(ERROR, (errmsg("cannot remove or disable the node "
                                   "%s:%d because because it contains "
                                   "the only shard placement for "
                                   "shard " UINT64_FORMAT,
                                   workerNode->workerName, workerNode->workerPort,
                                   placement->shardId),
                            errdetail("One of the table(s) that prevents the operation "
                                      "complete successfully is %s",
                                      qualifiedRelationName),
                            errhint("To proceed, either drop the tables or use "
                                    "undistribute_table() function to convert "
                                    "them to local tables")));
        }
    }
}

/*
 * PlacementHasActivePlacementOnAnotherGroup returns true if there is at least
 * one more active placement of the input sourcePlacement on another group.
 */
static bool PlacementHasActivePlacementOnAnotherGroup(
    GroupShardPlacement* sourcePlacement)
{
    uint64 shardId = sourcePlacement->shardId;
    List* activePlacementList = ActiveShardPlacementList(shardId);

    bool foundActivePlacementOnAnotherGroup = false;
    ShardPlacement* activePlacement = NULL;
    foreach_declared_ptr(activePlacement, activePlacementList)
    {
        if (activePlacement->groupId != sourcePlacement->groupId) {
            foundActivePlacementOnAnotherGroup = true;
            break;
        }
    }

    return foundActivePlacementOnAnotherGroup;
}

/* CountPrimariesWithMetadata returns the number of primary nodes which have metadata. */
uint32 CountPrimariesWithMetadata(void)
{
    uint32 primariesWithMetadata = 0;
    WorkerNode* workerNode = NULL;

    HASH_SEQ_STATUS status;
    HTAB* workerNodeHash = GetWorkerNodeHash();

    hash_seq_init(&status, workerNodeHash);

    while ((workerNode = static_cast<WorkerNode*>(hash_seq_search(&status))) != NULL) {
        if (workerNode->hasMetadata && NodeIsPrimary(workerNode)) {
            primariesWithMetadata++;
        }
    }

    return primariesWithMetadata;
}

/*
 * AddNodeMetadata checks the given node information and adds the specified node to the
 * pg_dist_node table of the master and workers with metadata.
 * If the node already exists, the function returns the id of the node.
 * If not, the following procedure is followed while adding a node: If the groupId is not
 * explicitly given by the user, the function picks the group that the new node should
 * be in with respect to GroupSize. Then, the new node is inserted into the local
 * pg_dist_node as well as the nodes with hasmetadata=true if localOnly is false.
 */
static int AddNodeMetadata(char* nodeName, int32 nodePort, NodeMetadata* nodeMetadata,
                           bool* nodeAlreadyExists, bool localOnly)
{
    EnsureCoordinator();

    *nodeAlreadyExists = false;

    WorkerNode* workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
    if (workerNode != NULL) {
        /* return early without holding locks when the node already exists */
        *nodeAlreadyExists = true;

        return workerNode->nodeId;
    }

    /*
     * We are going to change pg_dist_node, prevent any concurrent reads that
     * are not tolerant to concurrent node addition by taking an exclusive
     * lock (conflicts with all but AccessShareLock).
     *
     * We may want to relax or have more fine-grained locking in the future
     * to allow users to add multiple nodes concurrently.
     */
    LockRelationOid(DistNodeRelationId(), ExclusiveLock);

    /* recheck in case 2 node additions pass the first check concurrently */
    workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
    if (workerNode != NULL) {
        *nodeAlreadyExists = true;

        return workerNode->nodeId;
    }

    if (nodeMetadata->groupId != COORDINATOR_GROUP_ID &&
        strcmp(nodeName, "localhost") != 0) {
        /*
         * User tries to add a worker with a non-localhost address. If the coordinator
         * is added with "localhost" as well, the worker won't be able to connect.
         */

        bool isCoordinatorInMetadata = false;
        WorkerNode* coordinatorNode =
            PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &isCoordinatorInMetadata);
        if (isCoordinatorInMetadata &&
            strcmp(coordinatorNode->workerName, "localhost") == 0) {
            ereport(ERROR, (errmsg("cannot add a worker node when the coordinator "
                                   "hostname is set to localhost"),
                            errdetail("Worker nodes need to be able to connect to the "
                                      "coordinator to transfer data."),
                            errhint("Use SELECT spq_set_coordinator_host('<hostname>') "
                                    "to configure the coordinator hostname")));
        }
    }

    /*
     * When adding the first worker when the coordinator has shard placements,
     * print a notice on how to drain the coordinator.
     */
    if (nodeMetadata->groupId != COORDINATOR_GROUP_ID && CoordinatorAddedAsWorkerNode() &&
        ActivePrimaryNonCoordinatorNodeCount() == 0 &&
        NodeGroupHasShardPlacements(COORDINATOR_GROUP_ID)) {
        ereport(NOTICE, (errmsg("shards are still on the coordinator after adding the "
                                "new node")));
    }

    /* user lets Citus to decide on the group that the newly added node should be in */
    if (nodeMetadata->groupId == INVALID_GROUP_ID) {
        nodeMetadata->groupId = GetNextGroupId();
    }

    if (nodeMetadata->groupId == COORDINATOR_GROUP_ID) {
        /*
         * Coordinator has always the authoritative metadata, reflect this
         * fact in the pg_dist_node.
         */
        nodeMetadata->hasMetadata = true;
        nodeMetadata->metadataSynced = true;

        /*
         * There is no concept of "inactive" coordinator, so hard code it.
         */
        nodeMetadata->isActive = true;
    }

    /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
    if (nodeMetadata->nodeRole != InvalidOid &&
        nodeMetadata->nodeRole == PrimaryNodeRoleId()) {
        WorkerNode* existingPrimaryNode =
            PrimaryNodeForGroup(nodeMetadata->groupId, NULL);

        if (existingPrimaryNode != NULL) {
            ereport(ERROR, (errmsg("group %d already has a primary node",
                                   nodeMetadata->groupId)));
        }
    }

    if (nodeMetadata->nodeRole == PrimaryNodeRoleId()) {
        if (strncmp(nodeMetadata->nodeCluster, WORKER_DEFAULT_CLUSTER, WORKER_LENGTH) !=
            0) {
            ereport(ERROR, (errmsg("primaries must be added to the default cluster")));
        }
    }

    /* generate the new node id from the sequence */
    int nextNodeIdInt = GetNextNodeId();

    InsertNodeRow(nextNodeIdInt, nodeName, nodePort, nodeMetadata);

    workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);

    if (Session_ctx::Vars().EnableMetadataSync && !localOnly) {
        /* send the delete command to all primary nodes with metadata */
        char* nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
        SendCommandToWorkersWithMetadata(nodeDeleteCommand);

        /* finally prepare the insert command and send it to all primary nodes */
        uint32 primariesWithMetadata = CountPrimariesWithMetadata();
        if (primariesWithMetadata != 0) {
            List* workerNodeList = list_make1(workerNode);
            char* nodeInsertCommand = NodeListInsertCommand(workerNodeList);

            SendCommandToWorkersWithMetadata(nodeInsertCommand);
        }
    }

    return workerNode->nodeId;
}

/*
 * AddNodeMetadataViaMetadataContext does the same thing as AddNodeMetadata but
 * make use of metadata sync context to send commands to workers to support both
 * transactional and nontransactional sync modes.
 */
static int AddNodeMetadataViaMetadataContext(char* nodeName, int32 nodePort,
                                             NodeMetadata* nodeMetadata,
                                             bool* nodeAlreadyExists)
{
    bool localOnly = true;
    int nodeId =
        AddNodeMetadata(nodeName, nodePort, nodeMetadata, nodeAlreadyExists, localOnly);

    /* do nothing as the node already exists */
    if (*nodeAlreadyExists) {
        return nodeId;
    }

    /*
     * Create metadata sync context that is used throughout node addition
     * and activation if necessary.
     */
    WorkerNode* node = ModifiableWorkerNode(nodeName, nodePort);

    /* we should always set active flag to true if we call spq_add_node */
    node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive, DatumGetBool(true));

    /*
     * After adding new node, if the node did not already exist, we will activate
     * the node.
     * If the worker is not marked as a coordinator, check that
     * the node is not trying to add itself
     */
    if (node != NULL && node->groupId != COORDINATOR_GROUP_ID &&
        node->nodeRole != SecondaryNodeRoleId() && IsWorkerTheCurrentNode(node)) {
        ereport(ERROR, (errmsg("Node cannot add itself as a worker."),
                        errhint("Add the node as a coordinator by using: "
                                "SELECT spq_set_coordinator_host('%s', %d);",
                                node->workerName, node->workerPort)));
    }

    List* nodeList = list_make1(node);
    bool collectCommands = false;
    bool nodesAddedInSameTransaction = true;
    MetadataSyncContext* context =
        CreateMetadataSyncContext(nodeList, collectCommands, nodesAddedInSameTransaction);

    if (Session_ctx::Vars().EnableMetadataSync) {
        /* send the delete command to all primary nodes with metadata */
        char* nodeDeleteCommand = NodeDeleteCommand(node->nodeId);
        SendOrCollectCommandListToMetadataNodes(context, list_make1(nodeDeleteCommand));

        /* finally prepare the insert command and send it to all primary nodes */
        uint32 primariesWithMetadata = CountPrimariesWithMetadata();
        if (primariesWithMetadata != 0) {
            char* nodeInsertCommand = NULL;
            if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) {
                nodeInsertCommand = NodeListInsertCommand(nodeList);
            } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) {
                /*
                 * We need to ensure node insertion is idempotent in nontransactional
                 * sync mode.
                 */
                nodeInsertCommand = NodeListIdempotentInsertCommand(nodeList);
            }
            Assert(nodeInsertCommand != NULL);
            SendOrCollectCommandListToMetadataNodes(context,
                                                    list_make1(nodeInsertCommand));
        }
    }

    ActivateNodeList(context);

    return nodeId;
}

/*
 * SetWorkerColumn function sets the column with the specified index
 * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
 * It also sends the same command for node update to other metadata nodes.
 * If anything fails during the transaction, we rollback it.
 * Returns the new worker node after the modification.
 */
WorkerNode* SetWorkerColumn(WorkerNode* workerNode, int columnIndex, Datum value)
{
    workerNode = SetWorkerColumnLocalOnly(workerNode, columnIndex, value);

    if (Session_ctx::Vars().EnableMetadataSync) {
        char* metadataSyncCommand =
            GetMetadataSyncCommandToSetNodeColumn(workerNode, columnIndex, value);

        SendCommandToWorkersWithMetadata(metadataSyncCommand);
    }

    return workerNode;
}

/*
 * SetNodeStateViaMetadataContext sets or unsets isactive, metadatasynced, and hasmetadata
 * flags via metadataSyncContext.
 */
static void SetNodeStateViaMetadataContext(MetadataSyncContext* context,
                                           WorkerNode* workerNode, Datum value)
{
    char* isActiveCommand = GetMetadataSyncCommandToSetNodeColumn(
        workerNode, Anum_pg_dist_node_isactive, value);
    char* metadatasyncedCommand = GetMetadataSyncCommandToSetNodeColumn(
        workerNode, Anum_pg_dist_node_metadatasynced, value);
    char* hasmetadataCommand = GetMetadataSyncCommandToSetNodeColumn(
        workerNode, Anum_pg_dist_node_hasmetadata, value);
    List* commandList =
        list_make3(isActiveCommand, metadatasyncedCommand, hasmetadataCommand);

    SendOrCollectCommandListToMetadataNodes(context, commandList);
}

/*
 * SetWorkerColumnOptional function sets the column with the specified index
 * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
 * It also sends the same command optionally for node update to other metadata nodes,
 * meaning that failures are ignored. Returns the new worker node after the modification.
 */
WorkerNode* SetWorkerColumnOptional(WorkerNode* workerNode, int columnIndex, Datum value)
{
    char* metadataSyncCommand =
        GetMetadataSyncCommandToSetNodeColumn(workerNode, columnIndex, value);

    List* workerNodeList =
        TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, ShareLock);

    /* open connections in parallel */
    WorkerNode* worker = NULL;
    foreach_declared_ptr(worker, workerNodeList)
    {
        bool success = SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(
            worker->workerName, worker->workerPort, CurrentUserName(),
            list_make1(metadataSyncCommand));

        if (!success) {
            /* metadata out of sync, mark the worker as not synced */
            ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d "
                                     "is failed on node %s:%d. "
                                     "Metadata on %s:%d is marked as out of sync.",
                                     workerNode->workerName, workerNode->workerPort,
                                     worker->workerName, worker->workerPort,
                                     worker->workerName, worker->workerPort)));

            SetWorkerColumnLocalOnly(worker, Anum_pg_dist_node_metadatasynced,
                                     BoolGetDatum(false));
        } else if (workerNode->nodeId == worker->nodeId) {
            /*
             * If this is the node we want to update and it is updated succesfully,
             * then we can safely update the flag on the coordinator as well.
             */
            SetWorkerColumnLocalOnly(workerNode, columnIndex, value);
        }
    }

    return FindWorkerNode(workerNode->workerName, workerNode->workerPort);
}

/*
 * SetWorkerColumnLocalOnly function sets the column with the specified index
 * (see pg_dist_node.h) on the worker in pg_dist_node.
 * It returns the new worker node after the modification.
 */
WorkerNode* SetWorkerColumnLocalOnly(WorkerNode* workerNode, int columnIndex, Datum value)
{
    Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
    TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
    HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);

    Datum values[Natts_pg_dist_node];
    bool isnull[Natts_pg_dist_node];
    bool replace[Natts_pg_dist_node];

    if (heapTuple == NULL) {
        ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
                               workerNode->workerName, workerNode->workerPort)));
    }

    memset(replace, 0, sizeof(replace));
    values[columnIndex - 1] = value;
    isnull[columnIndex - 1] = false;
    replace[columnIndex - 1] = true;

    heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);

    CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);

    CitusInvalidateRelcacheByRelid(DistNodeRelationId());
    CommandCounterIncrement();

    WorkerNode* newWorkerNode = TupleToWorkerNode(tupleDescriptor, heapTuple);

    table_close(pgDistNode, NoLock);

    return newWorkerNode;
}

/*
 * GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is
 * valid or not. Then it returns the necessary metadata sync command as a string.
 */
static char* GetMetadataSyncCommandToSetNodeColumn(WorkerNode* workerNode,
                                                   int columnIndex, Datum value)
{
    char* metadataSyncCommand = NULL;

    switch (columnIndex) {
        case Anum_pg_dist_node_hasmetadata: {
            ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata");
            metadataSyncCommand =
                NodeHasmetadataUpdateCommand(workerNode->nodeId, DatumGetBool(value));
            break;
        }

        case Anum_pg_dist_node_isactive: {
            ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive");

            metadataSyncCommand =
                NodeStateUpdateCommand(workerNode->nodeId, DatumGetBool(value));
            break;
        }

        case Anum_pg_dist_node_shouldhaveshards: {
            metadataSyncCommand =
                ShouldHaveShardsUpdateCommand(workerNode->nodeId, DatumGetBool(value));
            break;
        }

        case Anum_pg_dist_node_metadatasynced: {
            ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced");
            metadataSyncCommand =
                NodeMetadataSyncedUpdateCommand(workerNode->nodeId, DatumGetBool(value));
            break;
        }

        default: {
            ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
                                   workerNode->workerName, workerNode->workerPort)));
        }
    }

    return metadataSyncCommand;
}

/*
 * NodeHasmetadataUpdateCommand generates and returns a SQL UPDATE command
 * that updates the hasmetada column of pg_dist_node, for the given nodeid.
 */
static char* NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata)
{
    StringInfo updateCommand = makeStringInfo();
    const char* hasMetadataString = hasMetadata ? "TRUE" : "FALSE";
    appendStringInfo(updateCommand,
                     "UPDATE pg_dist_node SET hasmetadata = %s "
                     "WHERE nodeid = %u",
                     hasMetadataString, nodeId);
    return updateCommand->data;
}

/*
 * NodeMetadataSyncedUpdateCommand generates and returns a SQL UPDATE command
 * that updates the metadataSynced column of pg_dist_node, for the given nodeid.
 */
static char* NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced)
{
    StringInfo updateCommand = makeStringInfo();
    const char* hasMetadataString = metadataSynced ? "TRUE" : "FALSE";
    appendStringInfo(updateCommand,
                     "UPDATE pg_dist_node SET metadatasynced = %s "
                     "WHERE nodeid = %u",
                     hasMetadataString, nodeId);
    return updateCommand->data;
}

/*
 * ErrorIfCoordinatorMetadataSetFalse throws an error if the input node
 * is the coordinator and the value is false.
 */
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode* workerNode, Datum value,
                                               char* field)
{
    bool valueBool = DatumGetBool(value);
    if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) {
        ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
                               "coordinator node",
                               field)));
    }
}

/*
 * SetShouldHaveShards function sets the shouldhaveshards column of the
 * specified worker in pg_dist_node. also propagates this to other metadata nodes.
 * It returns the new worker node after the modification.
 */
static WorkerNode* SetShouldHaveShards(WorkerNode* workerNode, bool shouldHaveShards)
{
    return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards,
                           BoolGetDatum(shouldHaveShards));
}

/*
 * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the
 * node is not found this function returns NULL.
 *
 * This function may return worker nodes from other clusters.
 */
static HeapTuple GetNodeTuple(const char* nodeName, int32 nodePort)
{
    Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
    const int scanKeyCount = 2;
    const bool indexOK = false;

    ScanKeyData scanKey[2];
    HeapTuple nodeTuple = NULL;

    ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, BTEqualStrategyNumber, F_TEXTEQ,
                CStringGetTextDatum(nodeName));
    ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, BTEqualStrategyNumber, F_INT4EQ,
                Int32GetDatum(nodePort));
    SysScanDesc scanDescriptor =
        systable_beginscan(pgDistNode, InvalidOid, indexOK, NULL, scanKeyCount, scanKey);

    HeapTuple heapTuple = systable_getnext(scanDescriptor);
    if (HeapTupleIsValid(heapTuple)) {
        nodeTuple = heap_copytuple(heapTuple);
    }

    systable_endscan(scanDescriptor);
    table_close(pgDistNode, NoLock);

    return nodeTuple;
}

/*
 * GetNodeByNodeId returns the heap tuple for given node id by looking up catalog.
 */
static HeapTuple GetNodeByNodeId(int32 nodeId)
{
    Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
    const int scanKeyCount = 1;
    const bool indexOK = false;

    ScanKeyData scanKey[1];
    HeapTuple nodeTuple = NULL;

    ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid, BTEqualStrategyNumber, F_INT4EQ,
                Int32GetDatum(nodeId));
    SysScanDesc scanDescriptor =
        systable_beginscan(pgDistNode, InvalidOid, indexOK, NULL, scanKeyCount, scanKey);

    HeapTuple heapTuple = systable_getnext(scanDescriptor);
    if (HeapTupleIsValid(heapTuple)) {
        nodeTuple = heap_copytuple(heapTuple);
    } else {
        ereport(ERROR, (errmsg("could not find valid entry for node id %d", nodeId)));
    }

    systable_endscan(scanDescriptor);
    table_close(pgDistNode, NoLock);

    return nodeTuple;
}

/*
 * GetNextGroupId allocates and returns a unique groupId for the group
 * to be created. This allocation occurs both in shared memory and in write
 * ahead logs; writing to logs avoids the risk of having groupId collisions.
 *
 * Please note that the caller is still responsible for finalizing node data
 * and the groupId with the master node. Further note that this function relies
 * on an internal sequence created in initdb to generate unique identifiers.
 */
int32 GetNextGroupId()
{
    text* sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME);
    Oid sequenceId = ResolveRelationId(sequenceName, false);
    Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
    Oid savedUserId = InvalidOid;
    int savedSecurityContext = 0;

    GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
    SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);

    /* generate new and unique shardId from sequence */
    Datum groupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);

    SetUserIdAndSecContext(savedUserId, savedSecurityContext);
    int32 groupId = -1;
    if (shouldReturnNumeric()) {
        groupId = DatumGetInt32(
            DirectFunctionCall1(numeric_int4, NumericGetDatum(groupIdDatum)));
    } else {
        groupId = DatumGetInt32(groupIdDatum);
    }

    return groupId;
}

/*
 * GetNextNodeId allocates and returns a unique nodeId for the node
 * to be added. This allocation occurs both in shared memory and in write
 * ahead logs; writing to logs avoids the risk of having nodeId collisions.
 *
 * Please note that the caller is still responsible for finalizing node data
 * and the nodeId with the master node. Further note that this function relies
 * on an internal sequence created in initdb to generate unique identifiers.
 */
int GetNextNodeId()
{
    text* sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME);
    Oid sequenceId = ResolveRelationId(sequenceName, false);
    Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
    Oid savedUserId = InvalidOid;
    int savedSecurityContext = 0;
    int nextNodeId = -1;

    GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
    SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);

    /* generate new and unique shardId from sequence */
    Datum nextNodeIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);

    SetUserIdAndSecContext(savedUserId, savedSecurityContext);

    if (shouldReturnNumeric()) {
        nextNodeId = DatumGetInt32(
            DirectFunctionCall1(numeric_int4, NumericGetDatum(nextNodeIdDatum)));
    } else {
        nextNodeId = DatumGetInt32(nextNodeIdDatum);
    }

    return nextNodeId;
}

/*
 * EnsureCoordinator checks if the current node is the coordinator. If it does not,
 * the function errors out.
 */
void EnsureCoordinator(void)
{
    int32 localGroupId = GetLocalGroupId();

    if (localGroupId != 0) {
        ereport(ERROR, (errmsg("operation is not allowed on this node"),
                        errhint("Connect to the coordinator and run it again.")));
    }
}

/*
 * EnsureCoordinatorIsInMetadata checks whether the coordinator is added to the
 * metadata, which is required for many operations.
 */
void EnsureCoordinatorIsInMetadata(void)
{
    bool isCoordinatorInMetadata = false;
    PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &isCoordinatorInMetadata);
    if (!isCoordinatorInMetadata) {
        ereport(ERROR, (errmsg("coordinator is not added to the metadata"),
                        errhint("Use SELECT spq_set_coordinator_host('<hostname>') "
                                "to configure the coordinator hostname")));
    }
}

/*
 * InsertCoordinatorIfClusterEmpty can be used to ensure Citus tables can be
 * created even on a node that has just performed CREATE EXTENSION citus;
 */
void InsertCoordinatorIfClusterEmpty(void)
{
    /* prevent concurrent node additions */
    Relation pgDistNode = table_open(DistNodeRelationId(), RowShareLock);

    if (!HasAnyNodes()) {
        /*
         * create_distributed_table being called for the first time and there are
         * no pg_dist_node records. Add a record for the coordinator.
         */
        InsertPlaceholderCoordinatorRecord();
    }

    /*
     * We release the lock, if InsertPlaceholderCoordinatorRecord was called
     * we already have a strong (RowExclusive) lock.
     */
    table_close(pgDistNode, RowShareLock);
}

/*
 * InsertPlaceholderCoordinatorRecord inserts a placeholder record for the coordinator
 * to be able to create distributed tables on a single node.
 */
static void InsertPlaceholderCoordinatorRecord(void)
{
    NodeMetadata nodeMetadata = DefaultNodeMetadata();
    nodeMetadata.groupId = 0;
    nodeMetadata.shouldHaveShards = true;
    nodeMetadata.nodeRole = PrimaryNodeRoleId();
    nodeMetadata.nodeCluster = "default";

    bool nodeAlreadyExists = false;
    bool localOnly = false;

    /* as long as there is a single node, localhost should be ok */
    AddNodeMetadata(Session_ctx::Vars().LocalHostName,
                    g_instance.attr.attr_network.PostPortNumber, &nodeMetadata,
                    &nodeAlreadyExists, localOnly);
}

/*
 * InsertNodeRow opens the node system catalog, and inserts a new row with the
 * given values into that system catalog.
 *
 * NOTE: If you call this function you probably need to have taken a
 * ShareRowExclusiveLock then checked that you're not adding a second primary to
 * an existing group. If you don't it's possible for the metadata to become inconsistent.
 */
static void InsertNodeRow(int nodeid, char* nodeName, int32 nodePort,
                          NodeMetadata* nodeMetadata)
{
    Datum values[Natts_pg_dist_node];
    bool isNulls[Natts_pg_dist_node];

    Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
    Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);

    /* form new shard tuple */
    memset(values, 0, sizeof(values));
    memset(isNulls, false, sizeof(isNulls));

    values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
    values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
    values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
    values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
    values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeMetadata->nodeRack);
    values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(nodeMetadata->hasMetadata);
    values[Anum_pg_dist_node_metadatasynced - 1] =
        BoolGetDatum(nodeMetadata->metadataSynced);
    values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive);
    values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole);
    values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
    values[Anum_pg_dist_node_shouldhaveshards - 1] =
        BoolGetDatum(nodeMetadata->shouldHaveShards);

    Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);

    TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
    HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);

    CatalogTupleInsert(pgDistNode, heapTuple);

    CitusInvalidateRelcacheByRelid(DistNodeRelationId());

    /* increment the counter so that next command can see the row */
    CommandCounterIncrement();

    /* close relation */
    table_close(pgDistNode, NoLock);
}

/*
 * DeleteNodeRow removes the requested row from pg_dist_node table if it exists.
 */
static void DeleteNodeRow(char* nodeName, int32 nodePort)
{
    const int scanKeyCount = 2;
    bool indexOK = false;

    ScanKeyData scanKey[2];
    Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);

    /*
     * simple_heap_delete() expects that the caller has at least an
     * AccessShareLock on primary key index.
     *
     * XXX: This does not seem required, do we really need to acquire this lock?
     * Postgres doesn't acquire such locks on indexes before deleting catalog tuples.
     * Linking here the reasons we added this lock acquirement:
     * https://github.com/citusdata/citus/pull/2851#discussion_r306569462
     * https://github.com/citusdata/citus/pull/2855#discussion_r313628554
     * https://github.com/citusdata/citus/issues/1890
     */
    Relation replicaIndex =
        index_open(RelationGetPrimaryKeyIndex(pgDistNode), AccessShareLock);

    ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, BTEqualStrategyNumber, F_TEXTEQ,
                CStringGetTextDatum(nodeName));
    ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, BTEqualStrategyNumber, F_INT4EQ,
                Int32GetDatum(nodePort));

    SysScanDesc heapScan =
        systable_beginscan(pgDistNode, InvalidOid, indexOK, NULL, scanKeyCount, scanKey);

    HeapTuple heapTuple = systable_getnext(heapScan);

    if (!HeapTupleIsValid(heapTuple)) {
        ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", nodeName,
                               nodePort)));
    }

    simple_heap_delete(pgDistNode, &(heapTuple->t_self));

    systable_endscan(heapScan);

    /* ensure future commands don't use the node we just removed */
    CitusInvalidateRelcacheByRelid(DistNodeRelationId());

    /* increment the counter so that next command won't see the row */
    CommandCounterIncrement();

    table_close(replicaIndex, AccessShareLock);
    table_close(pgDistNode, NoLock);
}

/*
 * TupleToWorkerNode takes in a heap tuple from pg_dist_node, and
 * converts this tuple to an equivalent struct in memory. The function assumes
 * the caller already has locks on the tuple, and doesn't perform any locking.
 */
static WorkerNode* TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
{
    Datum datumArray[Natts_pg_dist_node];
    bool isNullArray[Natts_pg_dist_node];

    Assert(!HeapTupleHasNulls(heapTuple));

    /*
     * This function can be called before "ALTER TABLE ... ADD COLUMN nodecluster ...",
     * therefore heap_deform_tuple() won't set the isNullArray for this column. We
     * initialize it true to be safe in that case.
     */
    memset(isNullArray, true, sizeof(isNullArray));

    /*
     * We use heap_deform_tuple() instead of heap_getattr() to expand tuple
     * to contain missing values when ALTER TABLE ADD COLUMN happens.
     */
    heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);

    char* nodeName = TextDatumGetCString(datumArray[Anum_pg_dist_node_nodename - 1]);
    char* nodeRack = TextDatumGetCString(datumArray[Anum_pg_dist_node_noderack - 1]);

    WorkerNode* workerNode = (WorkerNode*)palloc0(sizeof(WorkerNode));
    workerNode->nodeId = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeid - 1]);
    workerNode->workerPort = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeport - 1]);
    workerNode->groupId = DatumGetInt32(datumArray[Anum_pg_dist_node_groupid - 1]);
    strlcpy(workerNode->workerName, nodeName, WORKER_LENGTH);
    strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
    workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]);
    workerNode->metadataSynced =
        DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]);
    workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
    workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
    workerNode->shouldHaveShards =
        DatumGetBool(datumArray[Anum_pg_dist_node_shouldhaveshards - 1]);

    /*
     * nodecluster column can be missing. In the case of extension creation/upgrade,
     * master_initialize_node_metadata function is called before the nodecluster
     * column is added to pg_dist_node table.
     */
    if (!isNullArray[Anum_pg_dist_node_nodecluster - 1]) {
        Name nodeClusterName =
            DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]);
        char* nodeClusterString = NameStr(*nodeClusterName);
        strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
    }

    return workerNode;
}

/*
 * StringToDatum transforms a string representation into a Datum.
 */
Datum StringToDatum(char* inputString, Oid dataType)
{
    Oid typIoFunc = InvalidOid;
    Oid typIoParam = InvalidOid;
    int32 typeModifier = -1;

    getTypeInputInfo(dataType, &typIoFunc, &typIoParam);
    getBaseTypeAndTypmod(dataType, &typeModifier);

    Datum datum = OidInputFunctionCall(typIoFunc, inputString, typIoParam, typeModifier);

    return datum;
}

/*
 * DatumToString returns the string representation of the given datum.
 */
char* DatumToString(Datum datum, Oid dataType)
{
    Oid typIoFunc = InvalidOid;
    bool typIsVarlena = false;

    getTypeOutputInfo(dataType, &typIoFunc, &typIsVarlena);
    char* outputString = OidOutputFunctionCall(typIoFunc, datum);

    return outputString;
}

/*
 * UnsetMetadataSyncedForAllWorkers sets the metadatasynced column of all metadata
 * worker nodes to false. It returns true if it updated at least a node.
 */
static bool UnsetMetadataSyncedForAllWorkers(void)
{
    bool updatedAtLeastOne = false;
    ScanKeyData scanKey[3];
    int scanKeyCount = 3;
    bool indexOK = false;

    /*
     * Concurrent spq_update_node() calls might iterate and try to update
     * pg_dist_node in different orders. To protect against deadlock, we
     * get an exclusive lock here.
     */
    Relation relation = table_open(DistNodeRelationId(), ExclusiveLock);
    TupleDesc tupleDescriptor = RelationGetDescr(relation);
    ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata, BTEqualStrategyNumber,
                F_BOOLEQ, BoolGetDatum(true));
    ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced, BTEqualStrategyNumber,
                F_BOOLEQ, BoolGetDatum(true));

    /* coordinator always has the up to date metadata */
    ScanKeyInit(&scanKey[2], Anum_pg_dist_node_groupid, BTGreaterStrategyNumber, F_INT4GT,
                Int32GetDatum(COORDINATOR_GROUP_ID));

    CatalogIndexState indstate = CatalogOpenIndexes(relation);

    SysScanDesc scanDescriptor =
        systable_beginscan(relation, InvalidOid, indexOK, NULL, scanKeyCount, scanKey);

    HeapTuple heapTuple = systable_getnext(scanDescriptor);
    if (HeapTupleIsValid(heapTuple)) {
        updatedAtLeastOne = true;
    }

    while (HeapTupleIsValid(heapTuple)) {
        Datum values[Natts_pg_dist_node];
        bool isnull[Natts_pg_dist_node];
        bool replace[Natts_pg_dist_node];

        memset(replace, false, sizeof(replace));
        memset(isnull, false, sizeof(isnull));
        memset(values, 0, sizeof(values));

        values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
        replace[Anum_pg_dist_node_metadatasynced - 1] = true;

        HeapTuple newHeapTuple =
            heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);

#ifdef DISABLE_OG_COMMENTS
        CatalogTupleUpdateWithInfo(relation, &newHeapTuple->t_self, newHeapTuple,
                                   indstate);
#endif

        CommandCounterIncrement();

        heap_freetuple(newHeapTuple);

        heapTuple = systable_getnext(scanDescriptor);
    }

    systable_endscan(scanDescriptor);
    CatalogCloseIndexes(indstate);
    table_close(relation, NoLock);

    return updatedAtLeastOne;
}

/*
 * ErrorIfAnyNodeNotExist errors if any node in given list not found.
 */
static void ErrorIfAnyNodeNotExist(List* nodeList)
{
    WorkerNode* node = NULL;
    foreach_declared_ptr(node, nodeList)
    {
        /*
         * First, locally mark the node is active, if everything goes well,
         * we are going to sync this information to all the metadata nodes.
         */
        WorkerNode* workerNode =
            FindWorkerNodeAnyCluster(node->workerName, node->workerPort);
        if (workerNode == NULL) {
            ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName,
                                   node->workerPort)));
        }
    }
}

/*
 * UpdateLocalGroupIdsViaMetadataContext updates local group ids for given list
 * of nodes with transactional or nontransactional mode according to transactionMode
 * inside metadataSyncContext.
 */
static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext* context)
{
    int activatedPrimaryCount = list_length(context->activatedWorkerNodeList);
    int nodeIdx = 0;
    for (nodeIdx = 0; nodeIdx < activatedPrimaryCount; nodeIdx++) {
        WorkerNode* node =
            static_cast<WorkerNode*>(list_nth(context->activatedWorkerNodeList, nodeIdx));
        List* commandList = list_make1(LocalGroupIdUpdateCommand(node->groupId));

        /* send commands to new workers, the current user should be a superuser */
        Assert(superuser());

        SendOrCollectCommandListToSingleNode(context, commandList, nodeIdx);
    }
}

/*
 * SendDeletionCommandsForReplicatedTablePlacements sends commands to delete replicated
 * placement for the metadata nodes with transactional or nontransactional mode according
 * to transactionMode inside metadataSyncContext.
 */
static void SendDeletionCommandsForReplicatedTablePlacements(MetadataSyncContext* context)
{
    WorkerNode* node = NULL;
    foreach_declared_ptr(node, context->activatedWorkerNodeList)
    {
        if (!node->isActive) {
            bool localOnly = false;
            int32 groupId = node->groupId;
            DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(
                context, groupId, localOnly);
        }
    }
}

/*
 * SyncNodeMetadata syncs node metadata with transactional or nontransactional
 * mode according to transactionMode inside metadataSyncContext.
 */
static void SyncNodeMetadata(MetadataSyncContext* context)
{
    CheckCitusVersion(ERROR);

    if (!Session_ctx::Vars().EnableMetadataSync) {
        return;
    }

    /*
     * Do not fail when we call this method from activate_node_snapshot
     * from workers.
     */
    if (!MetadataSyncCollectsCommands(context)) {
        EnsureCoordinator();
    }

    EnsureModificationsCanRun();
    EnsureSequentialModeMetadataOperations();

    LockRelationOid(DistNodeRelationId(), ExclusiveLock);

    /* generate the queries which drop the node metadata */
    List* dropMetadataCommandList = NodeMetadataDropCommands();

    /* generate the queries which create the node metadata from scratch */
    List* createMetadataCommandList = NodeMetadataCreateCommands();

    List* recreateNodeSnapshotCommandList = dropMetadataCommandList;
    recreateNodeSnapshotCommandList =
        list_concat(recreateNodeSnapshotCommandList, createMetadataCommandList);

    /*
     * We should have already added node metadata to metadata workers. Sync node
     * metadata just for activated workers.
     */
    SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList);
}
